Implement cost-based optimizer for INTERSECTS join algorithm selection — Closes #67#76
Implement cost-based optimizer for INTERSECTS join algorithm selection — Closes #67#76conradbzura wants to merge 28 commits intomainfrom
Conversation
Introduces the giql-datafusion Rust crate, a DataFusion PhysicalOptimizerRule that reads Parquet metadata and lightweight sampling to choose between sweep-line and binned equi-join algorithms for interval overlap joins. The optimizer collects statistics in two tiers: - Tier 1 (free): row group column stats, sorting_columns, page index presence from the Parquet file footer - Tier 2 (milliseconds): width distribution (median, p95, p99, CV) from sampling start/end columns of 1-3 representative row groups Decision function short-circuits to sweep line when the width distribution is heavy-tailed (p99/median > 10) or high-variance (CV > 1.5), and falls back to a cost comparison for uniform distributions where binning is viable. Both algorithms are implemented as custom ExecutionPlan nodes: - SweepLineJoinExec: sort + sweep with active set, O(n log n + k) - BinnedJoinExec: bin expansion + hash map probe + dedup, O(n*r + k)
DataFusion v47 returns StringViewArray (not StringArray) for string columns read from Parquet. The sweep-line and binned join execution plans were downcasting to StringArray only, causing runtime errors when the optimizer replaced default joins. Now both string types are handled via a fallback chain.
Unit tests cover stats types, Parquet sampling, cost model edge cases, config defaults, pruning bounds, and optimizer registration. Integration tests exercise the full pipeline through DataFusion: create Parquet files, register with SessionContext, execute INTERSECTS join SQL, and verify correctness for overlapping, non-overlapping, cross-chromosome, adjacent, and containment interval scenarios. Total: 38 tests (31 unit + 6 integration + 1 doc-test).
…detection Three correctness bugs fixed: 1. Sweep-line active set: the retain predicate was removing wide right intervals based on r.start >= l.end, but those intervals could still overlap a later, wider left interval. Now only truly expired intervals (r.end <= l.start) are removed, and the full overlap check is done inline when emitting matches. 2. Multi-partition collection: RepartitionExec uses shared channels that break under sequential partition reads. Replaced manual sequential collection with datafusion::physical_plan::collect which spawns all partitions concurrently. 3. Filter column resolution: DataFusion may order filter operands as (right.start < left.end) instead of (left.start < right.end). The pattern detector now resolves columns by name and join side rather than assuming positional semantics. Also handles Int32 and StringViewArray column types that DataFusion v47 produces from Parquet reads.
Instead of a single-threaded custom BinnedJoinExec, the optimizer
now rewrites the physical plan to compose DataFusion's own parallel
infrastructure:
BinExpandExec(left) ─┐
├─ HashJoinExec(on=[chrom, __giql_bin])
BinExpandExec(right) ─┘ │
FilterExec (canonical-bin dedup)
│
ProjectionExec (strip extra cols)
BinExpandExec is a stateless per-partition node that replicates each
interval into rows for every genome bin it touches, adding __giql_bin
and __giql_first_bin columns. The canonical-bin filter ensures each
pair is emitted exactly once by keeping only the match from
max(left_first_bin, right_first_bin).
When no Parquet stats are available, the optimizer now defers to
DataFusion's built-in join rather than defaulting to sweep line.
CLI tool that runs INTERSECTS join queries through DataFusion with the optimizer registered, outputting JSON timing results per rep. Supports --no-optimizer flag for baseline comparison and --op join|pairs for different query shapes.
Restructure the sweep-line algorithm for three key speedups: 1. Per-chromosome parallelism: intervals on different chromosomes cannot overlap, so each chromosome is swept independently via tokio::spawn. This scales with the number of chromosomes (~23 for human genome). 2. Integer chromosome IDs: map chromosome strings to dense u32 IDs at collection time, eliminating String allocations and HashMap lookups during the hot sort/sweep loop. 3. Vectorized output: collect match indices as u32 arrays and use arrow::compute::take once per column instead of per-row slice + concat. At 500K intervals/side, heavy-tail goes from 0.56s to 0.11s (5x) and moderate from 0.19s to 0.10s (2x), both 100-120x faster than naive.
The sweep-line exec now declares required_input_ordering of (chrom ASC, start ASC) on both children so DataFusion inserts SortExec nodes automatically when inputs are unsorted. The cost model picks the smaller side (by row count from Parquet metadata) as the build side to materialize, reducing peak memory for asymmetric joins. The SmallSide enum replaces the previous skip_sort flag throughout the optimizer and cost model.
Replace the collect-both-sides approach with a proper poll_next state machine that follows DataFusion's build/probe pattern: WaitBuildSide → FetchProbeBatch → process → FetchProbeBatch → ... The build side (smaller, selected by cost model) is materialized into a sorted, chromosome-indexed BuildSideData struct. The probe side is streamed batch-by-batch via a SendableRecordBatchStream. Each probe batch is swept against the build side per-chromosome, producing output immediately via vectorized compute::take. Multi-partition probe inputs are coalesced via CoalescePartitionsExec to handle DataFusion's RepartitionExec transparently. Chromosome matching between build and probe uses string names rather than independently-assigned integer IDs, avoiding mismatches when batches contain different chromosome subsets.
Instead of a separate FilterExec + ProjectionExec after the HashJoinExec, the canonical-bin dedup condition is now embedded directly in the JoinFilter expression. The HashJoinExec's own projection parameter strips the extra columns. This reduces the binned plan from 4 nodes (BinExpandExec → HashJoinExec → FilterExec → ProjectionExec) to 2 nodes (BinExpandExec → HashJoinExec), eliminating two intermediate RecordBatch materializations. The remaining performance gap vs the pure SQL binning approach is attributable to the DataFusion engine version (v47 vs v52), not the plan structure.
Sets p99_median and CV thresholds to MAX so the cost model always selects the binned strategy, useful for isolating binned-path performance independently of distribution characteristics.
Closes the 1.6x engine performance gap between our Rust crate and the Python DataFusion package (v52). API migration: - properties() now returns &Arc<PlanProperties> - required_input_ordering() returns Vec<Option<OrderingRequirements>> - HashJoinExec::try_new takes NullEquality enum + null_aware bool - LexRequirement::new returns Option (non-empty invariant)
Runs the pure SQL binned join query through the same Rust DataFusion engine, enabling apples-to-apples comparison of the SQL approach vs the physical plan rewrite on the same engine version.
Both the physical plan rewrite (BinExpandExec) and SQL re-planning (BinnedSqlExec) add more overhead than they save vs DataFusion's built-in hash join on chrom + range filter. The optimizer now defers to DataFusion for uniform-width data. The sweep-line remains the primary optimization path, delivering 100x+ speedup for heavy-tailed distributions.
Adds IntersectsLogicalRule that rewrites interval overlap joins to UNNEST-based binned equi-joins at the logical level, enabling DataFusion's native UNNEST, hash join, and DISTINCT to run with full parallelism. The rule is disabled by default (enable_logical_rule config flag) because it has a known schema bug with wide intervals spanning many bins. The physical sweep-line rule remains the active optimizer. When enabled and working correctly for uniform data, it matches the performance of hand-written SQL binned joins (~0.012s at 500K) by letting DataFusion handle the entire execution pipeline natively. Also includes BinnedSqlExec (SQL re-planning exec) which was explored as an alternative approach but found to have higher overhead than the logical rewrite due to context creation and materialization costs.
Three fixes to the UNNEST-based binned join logical rule: 1. SubqueryAlias on expanded sides preserves table qualifiers so the join filter (a.start < b.end) resolves correctly after UNNEST transforms the schema. 2. DISTINCT before PROJECT ordering prevents DataFusion's projection-pushdown from folding column selection into the join, which caused column count mismatches at runtime. 3. Canonical-bin filter with CAST(start AS BIGINT) / bin_size eliminates multi-bin duplicates without DISTINCT on the full output, and handles Int32 start columns correctly. The logical rule now matches hand-written SQL-binned performance within 10% across all distribution profiles (0.016-0.024s at 500K) and is enabled by default.
The logical rule now reads Parquet file footer metadata (row group column statistics) to compute the bin size adaptively. The width signal max(end) - max(start) approximates the typical interval width and is used directly as the bin size, clamped to [1K, 1M]. When a ListingTable is detected, the rule downcasts through DefaultTableSource to access table_paths(), reads the first file's footer via collect_metadata(), and extracts per-row-group min/max bounds for start and end columns. Falls back to the 10K default only if no Parquet metadata is accessible (e.g., in-memory Arrow tables). Also fixes the bench binary to respect the config default for enable_logical_rule instead of always overriding to false.
The canonical-bin filter already ensures each interval pair is emitted exactly once, making DISTINCT a no-op that wastes cycles hashing all output rows. Removing it closes the gap with hand- written SQL from 1.2-1.6x to 1.1-1.4x.
Cover the logical rule's join detection, rewrite correctness, adaptive bin sizing, canonical-bin dedup, edge cases, and full pipeline integration through DataFusion. Total: 65 tests (31 unit + 27 logical rule + 6 integration + 1 doc).
Code ReviewBlocking
High
Medium
Low
|
…ug output Blocking fixes from code review: 1. Replace the fragile is_from_left alphabetical heuristic with schema-based column resolution. The function now checks the column's qualified name against the join's left child schema via DFSchema::has_column(), correctly handling any table alias. 2. Replace all eprintln! debug statements with log::debug! across logical_rule.rs, optimizer.rs, and pattern.rs (17 instances). Also fixes: - Doc comment for enable_logical_rule now matches the true default - Remove unnecessary unsafe Pin::new_unchecked in sweep_line.rs - Add test for non-a/l table aliases (peaks/genes) to verify fix
The transpile() function now accepts dialect="datafusion" which emits giql_intersects(start, end, start, end) function calls for column-to-column INTERSECTS joins instead of expanding to raw overlap predicates. This preserves INTERSECTS semantics through the SQL layer so a DataFusion logical optimizer rule can match on the function call directly, without heuristic column-name pattern detection. Literal range queries and CONTAINS/WITHIN fall through to the base generator unchanged.
The logical optimizer rule now matches on giql_intersects() function calls emitted by the GIQL transpiler's datafusion dialect, instead of reverse-engineering overlap predicates from column name heuristics. A placeholder giql_intersects ScalarUDF is registered so DataFusion's SQL parser accepts the function call. The logical rule rewrites it to a binned equi-join with adaptive bin sizing, replacing the function call with real overlap predicates before execution. This eliminates the physical optimizer, cost model, sweep line executor, sampling infrastructure, and heuristic pattern matching (15 files, ~4,000 lines removed). The binned join approach with adaptive bin sizing from table statistics is sufficient for all interval width distributions. BREAKING CHANGE: register_optimizer() no longer accepts IntersectsOptimizerConfig. The IntersectsOptimizerConfig struct, JoinStrategy enum, and physical optimizer rule are removed.
The previous width estimate max(end) - max(start) only measured the width of the interval with the largest start coordinate, which could be catastrophically wrong for bimodal data. Replace with two independent width signals — min(end)-min(start) and max(end)-max(start) — and take the max for robustness. Also: use schema qualifier instead of walking to TableScan for SubqueryAlias resolution (fixes self-join alias collisions), use exact column name matching for bin column filtering, and guard against negative width values before i64-to-usize cast.
Library crates should not commit Cargo.lock per Rust convention. Also ignore the target/ build directory.
Add 5 Python tests for the datafusion dialect parameter (join emits giql_intersects, literal range unchanged, contains unchanged, default dialect unchanged, invalid dialect raises ValueError). Add 2 Rust integration tests: self-join (same table on both sides) and compound predicates (extra filter alongside giql_intersects). Also wrap the datafusion generator import in transpile.py with a try/except that produces a clear error message when the package is not installed.
The column-level min/max heuristic for bin sizing fails when the widest interval is in the middle of the coordinate space — neither at min(start) nor max(end). Both estimates can simultaneously underestimate, leading to massive bin replication. Read actual start/end values from 1–3 representative Parquet row groups and compute p95 interval width directly. This is used as Tier 1 for bin sizing (sampled p95), falling back to the column-level heuristic (Tier 2) for non-Parquet sources. Sampling adds ~0.2ms to planning — under 1% of total query time.
Cap sampled rows to 300K and filter non-positive widths to handle malformed intervals and bound memory for huge row groups. Add assert!(!result.transformed) to join-type skip tests that previously discarded the result. Remove misleading try/except ImportError from transpile.py since the datafusion generator ships with giql. Add Rust gitignore patterns to root .gitignore.
Replace binned equi-join with a cache-oblivious interval tree (coitrees crate) as the default join strategy for INTERSECTS. The COI tree stores each interval exactly once — no bin replication — and queries in O(log N + k) regardless of width distribution. Benchmarks at 100K intervals/side show the COI tree path is faster than fixed-10K binning across all tested distributions, including 2-5x on uniform data and 37x on pathological middle-wide data. The binned equi-join path is retained behind IntersectsConfig with force_binned = true for future benchmarking. The adaptive bin sizing (Parquet sampling + cost-optimal binary search) is preserved for that path. Architecture: - COITreeJoinNode: UserDefinedLogicalNode emitted by the logical rule - COITreeExec: build/probe ExecutionPlan using per-chromosome COITrees - COITreePlanner: ExtensionPlanner converting logical to physical - GiqlQueryPlanner: registers the extension planner on SessionState
Summary
Add a DataFusion logical optimizer rule that rewrites INTERSECTS joins into binned equi-joins with adaptive bin sizing. Instead of reverse-engineering overlap predicates from column-name heuristics after transpilation, introduce a
"datafusion"dialect to the GIQL transpiler that emitsgiql_intersects()function calls, preserving INTERSECTS semantics through the SQL layer so the optimizer can match directly.Bin size is chosen using two-tier statistics: Tier 1 reads actual start/end values from 1–3 representative Parquet row groups and computes p95 interval width (~0.2ms overhead, capped at 300K sampled rows). Tier 2 falls back to column-level min/max estimates for non-Parquet sources. DataFusion handles parallelism, hash join execution, and disk spilling natively.
Closes #67
Proposed changes
Transpiler dialect support (Python)
Add
dialectparameter totranspile()with"datafusion"option. A newDataFusionGIQLGeneratorsubclass overrides_generate_column_join()for INTERSECTS to emitgiql_intersects(start, end, start, end)instead of raw overlap predicates. The chrom equi-key is preserved as standard SQL for hash partitioning. Literal range queries and CONTAINS/WITHIN fall through to the base generator unchanged.Placeholder UDF (
lib.rs)Register a
giql_intersectsScalarUDF on the DataFusion session state so the SQL parser accepts the function call. The UDF is never executed — the logical rule rewrites it away before execution. If the rule is missing, the UDF returns a clear error message.Logical optimizer rule (
logical_rule.rs)The
IntersectsLogicalRuledetectsgiql_intersects()in join filters and rewrites to:range(start/B, (end-1)/B+1)bin columns via UNNEST(chrom, bin_id)plus the original overlap filter (withgiql_intersects()replaced by realstart < end AND end > startpredicates)__giql_bins == CASE WHEN left_first_bin >= right_first_bin THEN left_first_bin ELSE right_first_bin END) eliminates multi-bin duplicates without DISTINCTTable aliases for the UNNEST SubqueryAlias are derived from the plan's schema qualifier rather than walking to the underlying TableScan, which correctly handles SQL aliases and self-joins.
Two-tier adaptive bin sizing (
logical_rule.rs)Tier 1 — Parquet sampling (~0.2ms): Read start/end columns from 1–3 representative row groups (first, middle, last) using the
parquetcrate's Arrow reader with projection pushdown. Compute actual interval widths (filtering non-positive values from malformed data) and return p95. Sampling is capped at 300K rows to bound memory for very large row groups. This handles all width distributions correctly, including "middle-wide" data where the widest interval has neither the smallest start nor the largest end. For remote sources (s3://, gs://), sampling fails gracefully and falls through to Tier 2.Tier 2 — Column-level heuristic (fallback): For non-Parquet sources, estimate width from
max(min(end)-min(start), max(end)-max(start))— two independent estimates from column-level min/max stats, taking the max for robustness.Both tiers clamp to [1,000, 1,000,000] and fall back to a default of 10,000 when no statistics are available.
Build configuration
Add Rust gitignore patterns (
target/,Cargo.lock) to both the root.gitignoreand a crate-level.gitignore. Theparquetcrate is a runtime dependency for Tier 1 sampling.Test cases
logical_rule_testintersects_logical_binnedlogical_rule_testlogical_rule_testlogical_rule_testlogical_rule_testlogical_rule_testlogical_rule_testlogical_rule_testlogical_rule_testlogical_rule_testlogical_rule_testlogical_rule_testlogical_rule_testlogical_rule_testlogical_rule_testlogical_rule_testlogical_rule_testlogical_rule_testlogical_rule_testlogical_rule_testlogical_rule_testlogical_rule_testlogical_rule_testlogical_rule_testlogical_rule_testlogical_rule_testlogical_rule_testlogical_rule_testlogical_rule_testlib::testsTestTranspileDataFusionDialectTestTranspileDataFusionDialectTestTranspileDataFusionDialectTestTranspileDataFusionDialectTestTranspileDataFusionDialect